package com.zusmart.base.network.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

import com.zusmart.base.activity.ActivityUtils;
import com.zusmart.base.buffer.BufferAllocator;
import com.zusmart.base.buffer.support.DefaultBufferAllocator;
import com.zusmart.base.logging.Logger;
import com.zusmart.base.logging.LoggerFactory;
import com.zusmart.base.network.ChannelAdapter;
import com.zusmart.base.network.ChannelContextManager;
import com.zusmart.base.network.ChannelOption;
import com.zusmart.base.network.ChannelUtils;
import com.zusmart.base.network.support.AbstractChannelAcceptor;
import com.zusmart.base.thread.FastThread;
import com.zusmart.base.toolkit.NetAddress;
import com.zusmart.base.util.Assert;

public class NioChannelAcceptor extends AbstractChannelAcceptor implements Runnable {

	private static final Logger logger = LoggerFactory.getLogger(NioChannelAcceptor.class);

	private FastThread monitor;
	private NetAddress bindAddress;

	private Selector selector;
	private ServerSocketChannel channel;
	private BufferAllocator bufferAllocator;
	private NioChannelContextManager channelContextManager;
	private NioChannelEventLoopGroup channelEventLoopGroup;

	private volatile boolean running = false;

	public NioChannelAcceptor(NetAddress bindAddress, ChannelOption channelOption, ChannelAdapter channelAdapter) {
		super(channelOption, channelAdapter);
		Assert.isNull(bindAddress, "bind address must not be null");
		this.bindAddress = bindAddress;
	}

	@Override
	public boolean isOpen() {
		return null != this.channel && this.channel.isOpen();
	}

	@Override
	public NetAddress getServerAddress() {
		return this.bindAddress;
	}

	@Override
	public ChannelContextManager getChannelContextManager() {
		return this.channelContextManager;
	}

	@Override
	protected void doStart() throws Exception {
		ChannelOption option = this.getChannelOption();
		this.monitor = new FastThread(this, option.getNioAcceptorName());
		this.bufferAllocator = new DefaultBufferAllocator();
		this.channelEventLoopGroup = new NioChannelEventLoopGroup(this.getChannelOption(),this.bufferAllocator);
		this.channelContextManager = new NioChannelContextManager(true,this.getChannelAdapter(), this.channelEventLoopGroup);
		this.selector = Selector.open();
		this.channel = ServerSocketChannel.open();
		this.channel.socket().setSoTimeout(option.getSoTimeout());
		this.channel.socket().setReuseAddress(option.isReuseAddress());
		this.channel.socket().bind(this.bindAddress.asSocketAddress(), option.getBackLog());
		this.channel.configureBlocking(false);
		this.channel.register(this.selector, SelectionKey.OP_ACCEPT);
		this.bindAddress = NetAddress.create((InetSocketAddress) this.channel.socket().getLocalSocketAddress());
		this.running = true;
		this.channelContextManager.start();
		this.channelEventLoopGroup.start();
		this.monitor.start();
	}

	@Override
	protected void doClose() throws Exception {
		this.running = false;
		this.selector.wakeup();
	}

	@Override
	public void run() {
		final Selector s = this.selector;
		final ServerSocketChannel c = this.channel;
		while (this.running) {

			int selected = 0;
			try {
				selected = s.select(5000);
			} catch (IOException e) {
				logger.error(e.getMessage(), e);
			}

			if (selected > 0) {
				Set<SelectionKey> keys = s.selectedKeys();
				for (SelectionKey key : keys) {
					if (key.isValid() && key.isAcceptable()) {
						try {
							this.accept(c.accept());
						} catch (IOException e) {
							ChannelUtils.cancel(key);
							logger.error(e.getMessage(), e);
						}
					} else {
						ChannelUtils.cancel(key);
					}
				}
				keys.clear();
			}

		}
		ChannelUtils.close(this.selector);
		ChannelUtils.close(this.channel);
		ActivityUtils.close(this.channelContextManager);
		ActivityUtils.close(this.channelEventLoopGroup);
	}

	protected void accept(SocketChannel socketChannel) throws IOException {
		NioChannelContext channelContext = this.channelContextManager.createChannelContext(socketChannel);
		channelContext.getNioChannelEventLoop().doRegister(channelContext);
	}

}